package drds.data_propagate.metadata;

import com.google.common.collect.Lists;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
import drds.data_propagate.entry.ClientId;
import drds.data_propagate.entry.position.PositionRange;
import drds.data_propagate.metadata.exception.DataPropagateMetaManagerException;
import lombok.Getter;
import lombok.Setter;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

public class BatchIdToPositionRangeMap {
    @Setter
    @Getter
    private ClientId clientId;
    @Setter
    @Getter
    private Map<Long, PositionRange> batchIdToPositionRangeMap = new MapMaker().makeMap();
    @Setter
    @Getter
    private AtomicLong atomicMaxBatchId = new AtomicLong(1);


    protected BatchIdToPositionRangeMap(ClientId clientId) {
        this.clientId = clientId;
    }

    public static BatchIdToPositionRangeMap create(ClientId clientId) {
        return new BatchIdToPositionRangeMap(clientId);
    }

    public synchronized void addPositionRange(PositionRange positionRange, Long batchId) {
        updateMaxId(batchId);
        batchIdToPositionRangeMap.put(batchId, positionRange);
    }

    public synchronized Long addPositionRange(PositionRange positionRange) {
        Long batchId = atomicMaxBatchId.getAndIncrement();
        batchIdToPositionRangeMap.put(batchId, positionRange);
        return batchId;
    }

    // 核心:min max位置
    public synchronized PositionRange removePositionRange(Long batchId) {
        if (batchIdToPositionRangeMap.containsKey(batchId)) {
            Long minBatchId = Collections.min(batchIdToPositionRangeMap.keySet());
            if (!minBatchId.equals(batchId)) {
                // 检查一下提交的ack/rollback，必须按batchId分出去的顺序提交，否则容易出现丢数据
                throw new DataPropagateMetaManagerException(
                        String.format("batchId:%d is not the firstly:%d", batchId, minBatchId));
            }
            return batchIdToPositionRangeMap.remove(batchId);
        } else {
            return null;
        }
    }

    public synchronized PositionRange getPositionRange(Long batchId) {
        return batchIdToPositionRangeMap.get(batchId);
    }

    public synchronized PositionRange getLastestPositionRange() {
        if (batchIdToPositionRangeMap.size() == 0) {
            return null;
        } else {
            Long batchId = Collections.max(batchIdToPositionRangeMap.keySet());
            return batchIdToPositionRangeMap.get(batchId);
        }
    }

    public synchronized PositionRange getFirstPositionRange() {
        if (batchIdToPositionRangeMap.size() == 0) {
            return null;
        } else {
            Long batchId = Collections.min(batchIdToPositionRangeMap.keySet());
            return batchIdToPositionRangeMap.get(batchId);
        }
    }

    public synchronized Map<Long, PositionRange> listAllPositionRange() {
        Set<Long> batchIdSets = batchIdToPositionRangeMap.keySet();
        List<Long> batchIds = Lists.newArrayList(batchIdSets);
        Collections.sort(Lists.newArrayList(batchIds));

        return Maps.newHashMap(batchIdToPositionRangeMap);
    }

    public synchronized void clearPositionRanges() {
        batchIdToPositionRangeMap.clear();
    }

    private synchronized void updateMaxId(Long batchId) {
        if (atomicMaxBatchId.get() < batchId + 1) {
            atomicMaxBatchId.set(batchId + 1);
        }
    }


}
